package com.yangm.registry;

import com.yangm.core.RpcServiceHelper;
import com.yangm.core.ServiceMeta;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.x.discovery.ServiceDiscovery;
import org.apache.curator.x.discovery.ServiceDiscoveryBuilder;
import org.apache.curator.x.discovery.ServiceInstance;
import org.apache.curator.x.discovery.details.InstanceSerializer;
import org.apache.curator.x.discovery.details.JsonInstanceSerializer;

import java.io.IOException;
import java.util.Collection;
import java.util.Optional;

public class ZookeeperRegistryService implements RegistryService {
    /**
     *
     */
    public static final int BASE_SLEEP_TIME_MS = 1000;
    public static final int MAX_RETIES = 3;
    public static final String ZK_BASE_PATH = "/mini_rpc";

    private final ServiceDiscovery<ServiceMeta> serviceDiscovery;

    public ZookeeperRegistryService(String addr) throws Exception {
        CuratorFramework client = CuratorFrameworkFactory.newClient(addr
                , new ExponentialBackoffRetry(BASE_SLEEP_TIME_MS, MAX_RETIES));
        client.start();


        InstanceSerializer<ServiceMeta> serializer = new JsonInstanceSerializer<>(ServiceMeta.class);
        this.serviceDiscovery = ServiceDiscoveryBuilder.builder(ServiceMeta.class)
                .client(client)
                .serializer(serializer)
                .basePath(ZK_BASE_PATH)
                .build();
        this.serviceDiscovery.start();
    }

    @Override
    public void register(ServiceMeta serviceMeta) throws Exception {
        ServiceInstance<ServiceMeta> in = getServiceMetaServiceInstance(serviceMeta);
        this.serviceDiscovery.registerService(in);

    }

    private ServiceInstance<ServiceMeta> getServiceMetaServiceInstance(ServiceMeta serviceMeta) throws Exception {
        ServiceInstance<ServiceMeta> in = ServiceInstance.<ServiceMeta>builder()
                .name(RpcServiceHelper.buildServiceKey(serviceMeta.getServiceName(), serviceMeta.getServiceVersion()))
                .address(serviceMeta.getServiceAddress())
                .port(serviceMeta.getServicePort())
                .payload(serviceMeta)
                .build();
        return in;
    }

    @Override
    public void unRegister(ServiceMeta serviceMeta) throws Exception {
        serviceDiscovery.unregisterService(getServiceMetaServiceInstance(serviceMeta));
    }

    @Override
    public ServiceMeta discovery(String serviceName, int invokerHashCode) throws Exception {
        /**
         * //TODO  获取具体服务提供者的负载算法
         * 如通过hashCode从hash环中获取 服务节点
         * 根据权重  轮询
         */
        Collection<ServiceInstance<ServiceMeta>> serviceInstances = serviceDiscovery.queryForInstances(serviceName);
        Optional<ServiceInstance<ServiceMeta>> first = serviceInstances.stream().findFirst();
        ServiceInstance<ServiceMeta> serviceInstance = null;
        if (!first.isPresent()) {
            throw new Exception("service no provider!");
        }
        serviceInstance = first.get();
        return serviceInstance.getPayload();
    }

    @Override
    public void destroy() throws IOException {
        serviceDiscovery.close();
    }
}
